package com.three.netty.core.mq;

import com.three.api.spi.push.MessagePusher;
import com.three.api.spi.push.MessagePusherFactory;
import com.three.tools.Utils;
import com.three.tools.thread.pool.ThreadPoolManager;

import java.util.Collection;

/**
 * Created by mathua on 2017/5/30.
 */
public final class MQMessageReceiver {

    private final static String TOPIC = "/chess/push/" + Utils.getLocalIp();

    public final MessagePusher pusher = MessagePusherFactory.create();

    private final MQClient mqClient;

    public static void subscribe(MQClient mqClient) {
        MQMessageReceiver receiver = new MQMessageReceiver(mqClient);
        mqClient.subscribe(TOPIC, receiver);
        receiver.fetchFormMQ();
    }

    public MQMessageReceiver(MQClient mqClient) {
        this.mqClient = mqClient;
    }

    public void onMessage(MQPushMessage message) {
        pusher.push(message);
    }

    public void fetchFormMQ() {
        ThreadPoolManager.I.newThread("mq-push", this::dispatch);
    }

    private void dispatch() {
        try {
            while (true) {
                Collection<MQPushMessage> message = mqClient.take(TOPIC);
                if (message == null || message.isEmpty()) {
                    Thread.sleep(100);
                    continue;
                }
                message.forEach(this::onMessage);
            }
        } catch (InterruptedException e) {
            this.dispatch();
        }
    }
}

